package cn.doitedu.rtmk.functions;

import cn.doitedu.common.interfaces.RuleCalculator;
import cn.doitedu.common.pojo.EventBean;
import cn.doitedu.rtmk.pojo.RuleMetaBean;
import cn.doitedu.rtmk.utils.StateDescriptors;
import com.alibaba.fastjson.JSON;
import groovy.lang.GroovyClassLoader;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class RuleProcessFunction extends KeyedBroadcastProcessFunction<Integer, EventBean, RuleMetaBean, String> {


    HashMap<String,RuleCalculator> ruleCalculatorHashMap = new HashMap<String,RuleCalculator>();

    @Override
    public void processElement(EventBean eventBean, KeyedBroadcastProcessFunction<Integer, EventBean, RuleMetaBean, String>.ReadOnlyContext ctx, Collector<String> out) throws Exception {

        ReadOnlyBroadcastState<String, RuleMetaBean> broadcastState = ctx.getBroadcastState(StateDescriptors.ruleMetaBroadCastStateDesc);
        Set<Map.Entry<String, RuleCalculator>> entries = ruleCalculatorHashMap.entrySet();

        // 遍历每一个规则实例
        for (Map.Entry<String, RuleCalculator> entry : entries) {
            RuleMetaBean metaBean = broadcastState.get(entry.getKey());
            // 取出规则运算机
            RuleCalculator ruleCalculator = entry.getValue();
            // 如果运算机还没有初始化，则进行初始化
            if(!ruleCalculator.isInitialed()){
                ruleCalculator.open(getRuntimeContext(),JSON.parseObject(metaBean.getRuleParamJson()),metaBean.getStaticProfileBitmap());
            }
            // 调用规则实例的运算机对当前用户事件进行处理
            ruleCalculator.processEvent(eventBean, out);
        }

    }

    @Override
    public void processBroadcastElement(RuleMetaBean metaBean, KeyedBroadcastProcessFunction<Integer, EventBean, RuleMetaBean, String>.Context ctx, Collector<String> out) throws Exception {

        BroadcastState<String, RuleMetaBean> broadcastState = ctx.getBroadcastState(StateDescriptors.ruleMetaBroadCastStateDesc);

        // 新增规则
        if (metaBean.getOperateType().equals("I")) {

            // 实例化metaBean中的groovy代码，得到该规则实例的运算机对象
            GroovyClassLoader groovyClassLoader = new GroovyClassLoader();
            Class aClass = groovyClassLoader.parseClass(metaBean.getGroovyCode());
            RuleCalculator ruleCalculator = (RuleCalculator) aClass.newInstance();

            // 然后将metaBean，放入广播状态
            broadcastState.put(metaBean.getRuleId(),metaBean);
            // 将运算机对象，放入hashmap
            ruleCalculatorHashMap.put(metaBean.getRuleId(), ruleCalculator);

        }
        // 删除（下线）规则
        else {
            ruleCalculatorHashMap.remove(metaBean.getRuleId());
            broadcastState.remove(metaBean.getRuleId());
        }
    }
}
